1 /*
2  * Collie - An asynchronous event-driven network framework using Dlang development
3  *
4  * Copyright (C) 2015-2017  Shanghai Putao Technology Co., Ltd 
5  *
6  * Developer: putao's Dlang team
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module collie.codec.mqtt.mqttdecoder;
12 
13 import std.stdio;
14 import std.array;
15 import std.conv;
16 import std.experimental.allocator;
17 import std.experimental.allocator.gc_allocator;
18 import collie.codec.messagetobyteencoder;
19 import kiss.container.Vector;
20 import collie.codec.mqtt.bytebuf;
21 import collie.codec.mqtt.mqttcodecutil;
22 import collie.codec.mqtt.mqttconnackmessage;
23 import collie.codec.mqtt.mqttconnackvariableheader;
24 import collie.codec.mqtt.mqttconnectmsg;
25 import collie.codec.mqtt.mqttconnectpayload;
26 import collie.codec.mqtt.mqttconnectreturncode;
27 import collie.codec.mqtt.mqttconnectvariableheader;
28 import collie.codec.mqtt.mqttfixedheader;
29 import collie.codec.mqtt.mqttmsg;
30 import collie.codec.mqtt.mqttmsgidvariableheader;
31 import collie.codec.mqtt.mqttmsgtype;
32 import collie.codec.mqtt.mqttpubackmsg;
33 import collie.codec.mqtt.mqttpublishmsg;
34 import collie.codec.mqtt.mqttpublishpayload;
35 import collie.codec.mqtt.mqttpublishvariableheader;
36 import collie.codec.mqtt.mqttqos;
37 import collie.codec.mqtt.mqttsubackmsg;
38 import collie.codec.mqtt.mqttsubackpayload;
39 import collie.codec.mqtt.mqttsubscribemsg;
40 import collie.codec.mqtt.mqttsubscribepayload;
41 import collie.codec.mqtt.mqtttopicsubscription;
42 import collie.codec.mqtt.mqttunsubscribemsg;
43 import collie.codec.mqtt.mqttunsubscribepayload;
44 import collie.codec.mqtt.mqttversion;
45 
46 import collie.channel.handler;
47 import collie.codec.bytetomessagedecoder;
48 
49  final class Result(T) {
50 	
51 	this(T _value, int _numberOfBytesConsumed) {
52 		this.value = _value;
53 		this.numberOfBytesConsumed = _numberOfBytesConsumed;
54 	}
55 private:
56 	 T value;
57 	 int numberOfBytesConsumed;
58 }
59 
60  class MqttDecoder :ByteToMessageDecoder!(MqttMsg[]) {
61 
62 public:
63 	 this() {
64 		this(DEFAULT_MAX_BYTES_IN_MESSAGE);
65 	}
66 	
67 	 this(int maxBytesInMessage) {
68 		//super(DecoderState.READ_FIXED_HEADER);
69 		_curstat = DecoderState.READ_FIXED_HEADER;
70 		this.maxBytesInMessage = maxBytesInMessage;
71 	}
72 
73 	override void read(Context ctx, ubyte[] msg)
74 	{
75 		bool success = true;
76 		MqttMsg[] result;
77 		success = decode(ctx, msg, result);
78 		if (success)
79 		{
80 			ctx.fireRead(result);
81 		}
82 	}
83 
84 	override bool decode(Context ctx, ubyte[] buf, ref MqttMsg[] mqs) {
85 		bool res = true;
86 		ByteBuf buffer = new ByteBuf(buf);
87 		//writeln("new bytebuf readerindex : --> ",buffer.readerIndex()," writeindex : ",buffer.writerIndex());
88 		//writeln(buffer.data);
89 		//writeln("new bytebuf len :",buffer.length," buf len : ",buf.length);
90 		while(1)
91 		{
92 			if(_curstat == DecoderState.BAD_MESSAGE || _curstat == DecoderState.DECODE_FINISH)
93 				break;
94 			switch (_curstat) {
95 				case DecoderState.READ_FIXED_HEADER:
96 					//writeln("decodeFixedHeader before ------------");
97 					mqttFixedHeader = decodeFixedHeader(buffer);
98 					bytesRemainingInVariablePart = mqttFixedHeader.remainingLength();
99 					_curstat = DecoderState.READ_VARIABLE_HEADER;
100 					// fall through
101 					break;
102 				case DecoderState.READ_VARIABLE_HEADER:
103 					if (bytesRemainingInVariablePart > maxBytesInMessage) {
104 						throw new Exception("too large message: " ~ to!string(bytesRemainingInVariablePart) ~ " bytes");
105 					}
106 					//writeln("decodeVariableHeader before ------------");
107 					// Result!(Object) decodedVariableHeader = decodeVariableHeader(buffer, mqttFixedHeader);
108 					switch (mqttFixedHeader.messageType()) {
109 						case MqttMsgType.CONNECT:
110 							auto decodedVariableHeader = decodeConnectionVariableHeader(buffer);
111 							variableHeader = decodedVariableHeader.value;
112 							bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;
113 							break;
114 							
115 						case MqttMsgType.CONNACK:
116 							auto decodedVariableHeader = decodeConnAckVariableHeader(buffer);
117 							variableHeader = decodedVariableHeader.value;
118 							bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;
119 							break;
120 							
121 						case MqttMsgType.SUBSCRIBE:
122 						case MqttMsgType.UNSUBSCRIBE:
123 						case MqttMsgType.SUBACK:
124 						case MqttMsgType.UNSUBACK:
125 						case MqttMsgType.PUBACK:
126 						case MqttMsgType.PUBREC:
127 						case MqttMsgType.PUBCOMP:
128 						case MqttMsgType.PUBREL:
129 							auto decodedVariableHeader = decodeMessageIdVariableHeader(buffer);
130 							variableHeader = decodedVariableHeader.value;
131 							bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;
132 							break;
133 							
134 						case MqttMsgType.PUBLISH:
135 							auto decodedVariableHeader = decodePublishVariableHeader(buffer, mqttFixedHeader);
136 							variableHeader = decodedVariableHeader.value;
137 							bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;
138 							break;
139 							
140 						case MqttMsgType.PINGREQ:
141 						case MqttMsgType.PINGRESP:
142 						case MqttMsgType.DISCONNECT:
143 							// Empty variable header
144 
145 						default:
146 							auto decodedVariableHeader = new Result!(Object)(null, 0);
147 							variableHeader = decodedVariableHeader.value;
148 							bytesRemainingInVariablePart -= decodedVariableHeader.numberOfBytesConsumed;
149 							break;
150 					}
151 
152 					_curstat = DecoderState.READ_PAYLOAD;
153 
154 					// fall through
155 					break;
156 				case DecoderState.READ_PAYLOAD:
157 					//writeln("DecoderState.READ_PAYLOAD  begin------------");
158 //					 Result!(Object) decodedPayload =
159 //						decodePayload(
160 //							buffer,
161 //							mqttFixedHeader.messageType(),
162 //							bytesRemainingInVariablePart,
163 //							variableHeader);
164 //					payload = decodedPayload.value;
165 //					bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed;
166 
167 					switch (mqttFixedHeader.messageType()) {
168 						case MqttMsgType.CONNECT:
169 							auto  decodedPayload = decodeConnectionPayload(buffer, cast(MqttConnectVariableHeader) variableHeader);
170 							payload = decodedPayload.value;
171 							bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed;
172 							mqs ~= new MqttConnectMsg(mqttFixedHeader, cast(MqttConnectVariableHeader)variableHeader, cast(MqttConnectPayload)payload);
173 							break;
174 							
175 						case MqttMsgType.SUBSCRIBE:
176 							auto  decodedPayload = decodeSubscribePayload(buffer, bytesRemainingInVariablePart);
177 							payload = decodedPayload.value;
178 							bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed;
179 							mqs ~= new MqttSubscribeMsg(mqttFixedHeader, cast(MqttMsgIdVariableHeader)variableHeader, cast(MqttSubscribePayload)payload);
180 							break;
181 
182 						case MqttMsgType.SUBACK:
183 							auto  decodedPayload = decodeSubackPayload(buffer, bytesRemainingInVariablePart);
184 							payload = decodedPayload.value;
185 							bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed;
186 							mqs ~= new MqttSubAckMsg(mqttFixedHeader, cast(MqttMsgIdVariableHeader)variableHeader, cast(MqttSubAckPayload)payload);
187 							break;
188 							
189 						case MqttMsgType.UNSUBSCRIBE:
190 							auto  decodedPayload = decodeUnsubscribePayload(buffer, bytesRemainingInVariablePart);
191 							payload = decodedPayload.value;
192 							bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed;
193 							mqs ~= new MqttUnsubscribeMsg(mqttFixedHeader, cast(MqttMsgIdVariableHeader)variableHeader, cast(MqttUnsubscribePayload)payload);
194 							break;
195 							
196 						case MqttMsgType.PUBLISH:
197 							auto  decodedPayload = decodePublishPayload(buffer, bytesRemainingInVariablePart);
198 							payload = decodedPayload.value;
199 							bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed;
200 							mqs ~= new MqttPublishMsg(mqttFixedHeader, cast(MqttPublishVariableHeader)variableHeader, cast(MqttPublishPayload)payload);
201 							break;
202 						case MqttMsgType.CONNACK:
203 							mqs ~= new MqttConnAckMessage(mqttFixedHeader, cast(MqttConnAckVariableHeader)variableHeader);
204 							break;
205 						default:
206 							// unknown payload , no byte consumed
207 							auto  decodedPayload = new Result!(Object)(null, 0);
208 							payload = decodedPayload.value;
209 							bytesRemainingInVariablePart -= decodedPayload.numberOfBytesConsumed;
210 							mqs ~= new MqttMsg(mqttFixedHeader, variableHeader, payload);
211 							break;
212 
213 					}
214 
215 					if (bytesRemainingInVariablePart != 0) {
216 						throw new Exception(
217 							"non-zero remaining payload bytes: " ~
218 							to!string(bytesRemainingInVariablePart) ~ " (" ~ to!string(mqttFixedHeader.messageType()) ~ ')');
219 					}
220 
221 					mqttFixedHeader = null;
222 					variableHeader = null;
223 					payload = null;
224 
225 					//writeln(" bytebuf readerindex : --> ",buffer.readerIndex()," writeindex : ",buffer.writerIndex(),"  len : ",buffer.length);
226 					if(buffer.readerIndex == buffer.length) //解码完毕
227 						_curstat = DecoderState.DECODE_FINISH;
228 					else
229 						_curstat = DecoderState.READ_FIXED_HEADER; //解析下条消息
230 
231 					break;
232 					
233 				case DecoderState.BAD_MESSAGE:
234 					// Keep discarding until disconnection.
235 					buffer.skipBytes(cast(int)buffer.length);
236 					break;
237 					
238 				default:
239 					// Shouldn't reach here.
240 					throw new Exception("decoder error");
241 			}
242 		}
243 
244 		_curstat = DecoderState.READ_FIXED_HEADER;
245 
246 		return res;
247 	}
248 
249 private:
250 //	 MqttMsg invalidMessage(Throwable cause) {
251 //		checkpoint(DecoderState.BAD_MESSAGE);
252 //		return MqttMessageFactory.newInvalidMessage(cause);
253 //	}
254 	
255 	/**
256      * Decodes the fixed header. It's one byte for the flags and then variable bytes for the remaining length.
257      *
258      * @param buffer the buffer to decode from
259      * @return the fixed header
260      */
261 	 static MqttFixedHeader decodeFixedHeader(ref ByteBuf buffer) {
262 		short b1 = buffer.readUnsignedByte();
263 		
264 		MqttMsgType messageType = to!MqttMsgType(b1 >> 4);
265 		bool dupFlag = (b1 & 0x08) == 0x08;
266 		int qosLevel = (b1 & 0x06) >> 1;
267 		bool retain = (b1 & 0x01) != 0;
268 		
269 		int remainingLength = 0;
270 		int multiplier = 1;
271 		short digit;
272 		int loops = 0;
273 		do {
274 			digit = buffer.readUnsignedByte();
275 			remainingLength += (digit & 127) * multiplier;
276 			multiplier *= 128;
277 			loops++;
278 		} while ((digit & 128) != 0 && loops < 4);
279 		
280 		// MQTT protocol limits Remaining Length to 4 bytes
281 		if (loops == 4 && (digit & 128) != 0) {
282 			throw new Exception("remaining length exceeds 4 digits (" ~ to!string(messageType) ~ ')');
283 		}
284 		MqttFixedHeader decodedFixedHeader =
285 			new MqttFixedHeader(messageType, dupFlag, to!(MqttQoS)(qosLevel), retain, remainingLength);
286 		return MqttCodecUtil.validateFixedHeader(MqttCodecUtil.resetUnusedFields(decodedFixedHeader));
287 	}
288 
289 	/**
290      * Decodes the variable header (if any)
291      * @param buffer the buffer to decode from
292      * @param mqttFixedHeader MqttFixedHeader of the same message
293      * @return the variable header
294      */
295 	 static Result!(Object) decodeVariableHeader(ref ByteBuf buffer, MqttFixedHeader mqttFixedHeader) {
296 		switch (mqttFixedHeader.messageType()) {
297 			case MqttMsgType.CONNECT:
298 				return cast(Result!(Object))(decodeConnectionVariableHeader(buffer));
299 				
300 			case MqttMsgType.CONNACK:
301 				return to!(Result!(Object))(decodeConnAckVariableHeader(buffer));
302 				
303 			case MqttMsgType.SUBSCRIBE:
304 			case MqttMsgType.UNSUBSCRIBE:
305 			case MqttMsgType.SUBACK:
306 			case MqttMsgType.UNSUBACK:
307 			case MqttMsgType.PUBACK:
308 			case MqttMsgType.PUBREC:
309 			case MqttMsgType.PUBCOMP:
310 			case MqttMsgType.PUBREL:
311 				return to!(Result!(Object))(decodeMessageIdVariableHeader(buffer));
312 				
313 			case MqttMsgType.PUBLISH:
314 				return to!(Result!(Object))(decodePublishVariableHeader(buffer, mqttFixedHeader));
315 				
316 			case MqttMsgType.PINGREQ:
317 			case MqttMsgType.PINGRESP:
318 			case MqttMsgType.DISCONNECT:
319 				// Empty variable header
320 				return new Result!(Object)(null, 0);
321 			default:
322 				return new Result!(Object)(null, 0);
323 		}
324 
325 	}
326 
327 	 static Result!(MqttConnectVariableHeader) decodeConnectionVariableHeader(ref ByteBuf buffer) {
328 		Result!(string) protoString = decodeString(buffer);
329 		int numberOfBytesConsumed = protoString.numberOfBytesConsumed;
330 		
331 		byte protocolLevel = buffer.readByte();
332 		numberOfBytesConsumed += 1;
333 		//writeln("decoder ---> ",to!string(protoString.value), protocolLevel);
334 		MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(cast(string)protoString.value, protocolLevel);
335 		
336 		int b1 = buffer.readUnsignedByte();
337 		numberOfBytesConsumed += 1;
338 		
339 		Result!(int) keepAlive = decodeMsbLsb(buffer);
340 		numberOfBytesConsumed += keepAlive.numberOfBytesConsumed;
341 		
342 		bool hasUserName = (b1 & 0x80) == 0x80;
343 		bool hasPassword = (b1 & 0x40) == 0x40;
344 		bool willRetain = (b1 & 0x20) == 0x20;
345 		int willQos = (b1 & 0x18) >> 3;
346 		bool willFlag = (b1 & 0x04) == 0x04;
347 		bool cleanSession = (b1 & 0x02) == 0x02;
348 		if (mqttVersion.protocolName() == "MQTT") {
349 			bool zeroReservedFlag = (b1 & 0x01) == 0x0;
350 			if (!zeroReservedFlag) {
351 				// MQTT v3.1.1: The Server MUST validate that the reserved flag in the CONNECT Control Packet is
352 				// set to zero and disconnect the Client if it is not zero.
353 				// See http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349230
354 				throw new Exception("non-zero reserved flag");
355 			}
356 		}
357 		
358 		MqttConnectVariableHeader mqttConnectVariableHeader = new MqttConnectVariableHeader(
359 			mqttVersion.protocolName(),
360 			mqttVersion.protocolLevel(),
361 			hasUserName,
362 			hasPassword,
363 			willRetain,
364 			willQos,
365 			willFlag,
366 			cleanSession,
367 			keepAlive.value);
368 		return new Result!(MqttConnectVariableHeader)(mqttConnectVariableHeader, numberOfBytesConsumed);
369 	}
370 	
371 	 static Result!(MqttConnAckVariableHeader) decodeConnAckVariableHeader(ref ByteBuf buffer) {
372 		bool sessionPresent = (buffer.readUnsignedByte() & 0x01) == 0x01;
373 		byte returnCode = cast(byte)buffer.readByte();
374 		int numberOfBytesConsumed = 2;
375 		MqttConnAckVariableHeader mqttConnAckVariableHeader =
376 			new MqttConnAckVariableHeader(to!(MqttConnectReturnCode)(returnCode), sessionPresent);
377 		return new Result!(MqttConnAckVariableHeader)(mqttConnAckVariableHeader, numberOfBytesConsumed);
378 	}
379 	
380 	 static Result!(MqttMsgIdVariableHeader) decodeMessageIdVariableHeader(ref ByteBuf buffer) {
381 		Result!(int) messageId = decodeMessageId(buffer);
382 		return new Result!(MqttMsgIdVariableHeader)(
383 			to!MqttMsgIdVariableHeader(MqttMsgIdVariableHeader.from(messageId.value)),
384 			messageId.numberOfBytesConsumed);
385 	}
386 	
387 	 static Result!(MqttPublishVariableHeader) decodePublishVariableHeader(
388 		ref ByteBuf buffer,
389 		MqttFixedHeader mqttFixedHeader) {
390 		Result!(string) decodedTopic = decodeString(buffer);
391 		if (!MqttCodecUtil.isValidPublishTopicName(decodedTopic.value)) {
392 			throw new Exception("invalid publish topic name: " ~ decodedTopic.value ~ " (contains wildcards)");
393 		}
394 		int numberOfBytesConsumed = decodedTopic.numberOfBytesConsumed;
395 		
396 		int messageId = -1;
397 		if (to!int(mqttFixedHeader.qosLevel()) > 0) {
398 			Result!(int) decodedMessageId = decodeMessageId(buffer);
399 			messageId = decodedMessageId.value;
400 			numberOfBytesConsumed += decodedMessageId.numberOfBytesConsumed;
401 		}
402 		MqttPublishVariableHeader mqttPublishVariableHeader =
403 			new MqttPublishVariableHeader(decodedTopic.value, messageId);
404 		return new Result!(MqttPublishVariableHeader)(mqttPublishVariableHeader, numberOfBytesConsumed);
405 	}
406 	
407 	 static Result!(int) decodeMessageId(ref ByteBuf buffer) {
408 		Result!(int) messageId = decodeMsbLsb(buffer);
409 		if (!MqttCodecUtil.isValidMessageId(messageId.value)) {
410 			throw new Exception("invalid messageId: " ~ to!string(messageId.value));
411 		}
412 		return messageId;
413 	}
414 	
415 	/**
416      * Decodes the payload.
417      *
418      * @param buffer the buffer to decode from
419      * @param messageType  type of the message being decoded
420      * @param bytesRemainingInVariablePart bytes remaining
421      * @param variableHeader variable header of the same message
422      * @return the payload
423      */
424 	 static Result!(Object) decodePayload(
425 		ref ByteBuf buffer,
426 		MqttMsgType messageType,
427 		int bytesRemainingInVariablePart,
428 		Object variableHeader) {
429 		switch (messageType) {
430 			case MqttMsgType.CONNECT:
431 				return to!(Result!(Object))(decodeConnectionPayload(buffer, cast(MqttConnectVariableHeader) variableHeader));
432 				
433 			case MqttMsgType.SUBSCRIBE:
434 				return to!(Result!(Object))(decodeSubscribePayload(buffer, bytesRemainingInVariablePart));
435 				
436 			case MqttMsgType.SUBACK:
437 				return to!(Result!(Object))(decodeSubackPayload(buffer, bytesRemainingInVariablePart));
438 				
439 			case MqttMsgType.UNSUBSCRIBE:
440 				return to!(Result!(Object))(decodeUnsubscribePayload(buffer, bytesRemainingInVariablePart));
441 				
442 			case MqttMsgType.PUBLISH:
443 				return to!(Result!(Object))(decodePublishPayload(buffer, bytesRemainingInVariablePart));
444 				
445 			default:
446 				// unknown payload , no byte consumed
447 				return new Result!(Object)(null, 0);
448 		}
449 	}
450 	
451 	 static Result!(MqttConnectPayload) decodeConnectionPayload(
452 		ref ByteBuf buffer,
453 		MqttConnectVariableHeader mqttConnectVariableHeader) {
454 		//writeln("decodeConnectionPayload ---begin----");
455 		Result!(string) decodedClientId = decodeString(buffer);
456 
457 		string decodedClientIdValue = decodedClientId.value;
458 
459 		MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(mqttConnectVariableHeader.name(),
460 			 to!byte(mqttConnectVariableHeader.mqtt_version()));
461 		if (!MqttCodecUtil.isValidClientId(mqttVersion, decodedClientIdValue)) {
462 			throw new Exception("invalid clientIdentifier: " ~ decodedClientIdValue);
463 		}
464 		int numberOfBytesConsumed = decodedClientId.numberOfBytesConsumed;
465 		
466 		Result!(string) decodedWillTopic = null;
467 		Result!(string) decodedWillMessage = null;
468 		if (mqttConnectVariableHeader.isWillFlag()) {
469 
470 			decodedWillTopic = decodeString(buffer, 0, 32767);
471 			//writeln("decodeConnectionPayload ---end----");
472 			numberOfBytesConsumed += decodedWillTopic.numberOfBytesConsumed;
473 			decodedWillMessage = decodeAsciiString(buffer);
474 			numberOfBytesConsumed += decodedWillMessage.numberOfBytesConsumed;
475 		}
476 		Result!(string) decodedUserName = null;
477 		Result!(string) decodedPassword = null;
478 		if (mqttConnectVariableHeader.hasUserName()) {
479 			decodedUserName = decodeString(buffer);
480 			numberOfBytesConsumed += decodedUserName.numberOfBytesConsumed;
481 		}
482 		if (mqttConnectVariableHeader.hasPassword()) {
483 			decodedPassword = decodeString(buffer);
484 			numberOfBytesConsumed += decodedPassword.numberOfBytesConsumed;
485 		}
486 		
487 		MqttConnectPayload mqttConnectPayload =
488 			new MqttConnectPayload(
489 				decodedClientId.value,
490 				decodedWillTopic !is null ? decodedWillTopic.value : null,
491 				decodedWillMessage !is null ? decodedWillMessage.value : null,
492 				decodedUserName !is null ? decodedUserName.value : null,
493 				decodedPassword !is null ? decodedPassword.value : null);
494 		return new Result!(MqttConnectPayload)(mqttConnectPayload, numberOfBytesConsumed);
495 	}
496 	
497 	 static Result!(MqttSubscribePayload) decodeSubscribePayload(
498 		ref ByteBuf buffer,
499 		int bytesRemainingInVariablePart) {
500 		MqttTopicSubscription[] subscribeTopics ;
501 		int numberOfBytesConsumed = 0;
502 		while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
503 			Result!(string) decodedTopicName = decodeString(buffer);
504 			numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
505 			int qos = buffer.readUnsignedByte() & 0x03;
506 			numberOfBytesConsumed++;
507 			subscribeTopics ~= new MqttTopicSubscription(decodedTopicName.value, to!MqttQoS(qos));
508 		}
509 		return new Result!(MqttSubscribePayload)(new MqttSubscribePayload(subscribeTopics), numberOfBytesConsumed);
510 	}
511 
512 	 static Result!(MqttSubAckPayload) decodeSubackPayload(
513 		ref ByteBuf buffer,
514 		int bytesRemainingInVariablePart) {
515 		int[] grantedQos ;
516 		int numberOfBytesConsumed = 0;
517 		while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
518 			int qos = buffer.readUnsignedByte() & 0x03;
519 			numberOfBytesConsumed++;
520 			grantedQos ~= qos;
521 		}
522 		return new Result!(MqttSubAckPayload)(new MqttSubAckPayload(grantedQos), numberOfBytesConsumed);
523 	}
524 	
525 	 static Result!(MqttUnsubscribePayload) decodeUnsubscribePayload(
526 		ref ByteBuf buffer,
527 		int bytesRemainingInVariablePart) {
528 		string[] unsubscribeTopics;
529 		int numberOfBytesConsumed = 0;
530 		while (numberOfBytesConsumed < bytesRemainingInVariablePart) {
531 			Result!(string) decodedTopicName = decodeString(buffer);
532 			numberOfBytesConsumed += decodedTopicName.numberOfBytesConsumed;
533 			unsubscribeTopics ~= decodedTopicName.value;
534 		}
535 		return new Result!(MqttUnsubscribePayload)(
536 			new MqttUnsubscribePayload(unsubscribeTopics),
537 			numberOfBytesConsumed);
538 	}
539 	
540 	static Result!(MqttPublishPayload) decodePublishPayload(ref ByteBuf buffer, int bytesRemainingInVariablePart) {
541 		ByteBuf b = buffer.readSlice(bytesRemainingInVariablePart);
542 		return new Result!(MqttPublishPayload)(new MqttPublishPayload(b.data()), bytesRemainingInVariablePart);
543 	}
544 
545 	 static Result!(string) decodeString(ref ByteBuf buffer) {
546 		return decodeString(buffer, 0, int.max);
547 	}
548 	
549 	 static Result!(string) decodeAsciiString(ref ByteBuf buffer) {
550 		Result!(string) result = decodeString(buffer, 0, int.max);
551 		string s = result.value;
552 		for (int i = 0; i < s.length; i++) {
553 			if (s[i] > 127) {
554 				return new Result!(string)(null, result.numberOfBytesConsumed);
555 			}
556 		}
557 		return new Result!(string)(s, result.numberOfBytesConsumed);
558 	}
559 	
560 	 static Result!(string) decodeString(ref ByteBuf buffer, int minBytes, int maxBytes) {
561 		Result!(int) decodedSize = decodeMsbLsb(buffer);
562 		int size = decodedSize.value;
563 		int numberOfBytesConsumed = decodedSize.numberOfBytesConsumed;
564 		if (size < minBytes || size > maxBytes) {
565 			buffer.skipBytes(size);
566 			numberOfBytesConsumed += size;
567 			return new Result!(string)(null, numberOfBytesConsumed);
568 		}
569 		string s = buffer.toString(buffer.readerIndex(), size);
570 		buffer.skipBytes(size);
571 		numberOfBytesConsumed += size;
572 		return new Result!(string)(s, numberOfBytesConsumed);
573 	}
574 	
575 	 static Result!(int) decodeMsbLsb(ref ByteBuf buffer) {
576 		return decodeMsbLsb(buffer, 0, 65535);
577 	}
578 	
579 	 static Result!(int) decodeMsbLsb(ref ByteBuf buffer, int min, int max) {
580 		short msbSize = buffer.readUnsignedByte();
581 		short lsbSize = buffer.readUnsignedByte();
582 		int numberOfBytesConsumed = 2;
583 
584 		int result = msbSize << 8 | lsbSize;
585 		if (result < min || result > max) {
586 			result = -1;
587 		}
588 		//writeln("msbSize : ",msbSize, " lsbSize : ",lsbSize, " result : ",result);
589 		return new Result!(int)(result, numberOfBytesConsumed);
590 	}
591 
592 public:
593 //	MqttMsgType getMsgType()
594 //	{
595 //		if(mqttFixedHeader !is null)
596 //			return mqttFixedHeader.messageType();
597 //		return MqttMsgType.UNKNOWN;
598 //	}
599 
600 private:
601 	 static  int DEFAULT_MAX_BYTES_IN_MESSAGE = 8092;
602 	
603 	/**
604      * States of the decoder.
605      * We start at READ_FIXED_HEADER, followed by
606      * READ_VARIABLE_HEADER and finally READ_PAYLOAD.
607      */
608 	enum DecoderState {
609 		READ_FIXED_HEADER,
610 		READ_VARIABLE_HEADER,
611 		READ_PAYLOAD,
612 		BAD_MESSAGE,
613 		DECODE_FINISH,
614 	}
615 	 DecoderState _curstat;
616 	 MqttFixedHeader mqttFixedHeader;
617 	 Object variableHeader;
618 	 Object payload;
619 	 int bytesRemainingInVariablePart;
620 	
621 	 int maxBytesInMessage;
622 }
623